Skip to content

Conversation

@halspang
Copy link
Member

@halspang halspang commented Mar 5, 2025

This commit adds a timeout to the gRPC stream used to communicate with the backend. This was done because the backend could restart and drop the connection and the worker would not know. This causes the worker to hang and not receive any new work items. The fix is to reset the connection if a long enough period of time has passed between receiving anything on the stream.

This commit adds a timeout to the gRPC stream used to communicate
with the backend. This was done because the backend could restart
and drop the connection and the worker would not know. This causes
the worker to hang and not receive any new work items. The fix is
to reset the connection if a long enough period of time has passed
between receiving anything on the stream.

Signed-off-by: halspang <[email protected]>
@halspang halspang force-pushed the halspang/streaming_timeout branch from 14951b4 to a098203 Compare March 5, 2025 23:43
@nytian nytian requested a review from jviau March 5, 2025 23:55
while (!cancellation.IsCancellationRequested)
{
await foreach (P.WorkItem workItem in stream.ResponseStream.ReadAllAsync(cancellation))
await foreach (P.WorkItem workItem in stream.ResponseStream.ReadAllAsync(tokenSource.Token))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this not throw if the connection is closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We thought it would too, but if you go into the IAsyncStreamReader, a cancellation is actually just treated as the end of the stream and it returns normally.

}
}

if (tokenSource.IsCancellationRequested || tokenSource.Token.IsCancellationRequested)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something, but it seems unlikely this line would ever be true. If IsCancellationRequested, then more likely than not stream.ResponseStream.ReadAllAsync throw OpeationCancelledException.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above. We thought this behavior was an odd choice for the stream reader as well, but it's documented that it doesn't throw.

@jviau
Copy link
Member

jviau commented Mar 6, 2025

Can you explain the observed code flow when the scheduler shuts down? Is an exception throw? If so, ProcessWorkItemsAsync is called with many catch statements - are one of them hit? Do we misconstrue this as a worker shutdown?

I would expect this line to be hit:

, is that not the case?

@halspang
Copy link
Member Author

halspang commented Mar 6, 2025

Can you explain the observed code flow when the scheduler shuts down? Is an exception throw? If so, ProcessWorkItemsAsync is called with many catch statements - are one of them hit? Do we misconstrue this as a worker shutdown?

It doesn't throw an exception, it returns as if the stream had just ended normally. So, once the cancellation is triggered, the foreach loop exits and we check the token. If the token is cancelled, we return. If the overall cancellation was cancelled, it will exit at that level as well. If not, it creates a new connection to the scheduler.

https://grpc.github.io/grpc/csharp/api/Grpc.Core.IAsyncStreamReader-1.html

Copy link
Member

@jviau jviau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving to unblock, but I think this is treating a symptom and not the underlying problem. If a scheduler restart isn't closing the connection to the worker, thus ending the stream, something else is keeping that alive (such as a reverse proxy). This needs to be looked at and addressed, as it is violating some fundamental expectations of gRPC streams.

I would feel slightly better if we make this privately configurable by the AzureManaged package somehow. We already have some psuedo-internal options they use here, could add to that and have this behavior only enabled for DTS.

Signed-off-by: halspang <[email protected]>
@cgillum
Copy link
Member

cgillum commented Mar 6, 2025

I think this is treating a symptom and not the underlying problem

Agreed, but I think this is a safety mechanism we want anyways, regardless of what gRPC server implementation we're targeting, so I'm happy to go with this for now. Warning logs have been added so that we can observe this behavior and be reminded that it needs to be further root caused.

@cgillum cgillum merged commit 7747663 into microsoft:main Mar 6, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants